Basic mrjob usage


In [9]:
%%writefile mr_word_count.py
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

        
if __name__ == '__main__':
    MRWordFrequencyCount.run()


Overwriting mr_word_count.py

In [28]:
%%writefile my_file.txt
Hello cat people! 

All cats and friends of cats are welcome to ride the catamaran!

Cats are friends to people in need.


Overwriting my_file.txt

Simple way to run the job locally


In [11]:
!python mr_word_count.py my_file.txt


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/mr_word_count.Jason.20160919.035730.154339
Running step 1 of 1...
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/mr_word_count.Jason.20160919.035730.154339/output...
"chars"	82
"lines"	3
"words"	15
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/mr_word_count.Jason.20160919.035730.154339...

Simulate hadoop


In [12]:
!python mr_word_count.py -r local my_file.txt


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/mr_word_count.Jason.20160919.035731.612045
Running step 1 of 1...
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/mr_word_count.Jason.20160919.035731.612045/output...
"chars"	82
"lines"	3
"words"	15
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/mr_word_count.Jason.20160919.035731.612045...

No debugging messages


In [66]:
!python mr_word_count.py -r local -q my_file.txt


"chars"	116
"lines"	5
"words"	22

Use python driver


In [165]:
# %%writefile test.py
from mr_word_count import MRWordFrequencyCount

mr_job = MRWordFrequencyCount(args=['my_file.txt'])

# with mr_job.make_runner() as runner:
#     runner.run() 
#     for line in runner.stream_output(): 
#         print(mr_job.parse_output_line(line))


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-165-2e143118b53f> in <module>()
      2 from mr_word_count import MRWordFrequencyCount
      3 
----> 4 mr_job = MRWordFrequencyCount(args=['my_file.txt'])
      5 
      6 # with mr_job.make_runner() as runner:

/Users/BlueOwl1/anaconda/lib/python3.5/site-packages/mrjob/job.py in __init__(self, args)
     97         ``python -m mrjob.job --help``
     98         """
---> 99         super(MRJob, self).__init__(self.mr_job_script(), args)
    100 
    101     @classmethod

/Users/BlueOwl1/anaconda/lib/python3.5/site-packages/mrjob/launch.py in __init__(self, script_path, args, from_cl)
    126         else:
    127             self.stdin = sys.stdin.buffer
--> 128             self.stdout = sys.stdout.buffer
    129             self.stderr = sys.stderr.buffer
    130 

AttributeError: 'OutStream' object has no attribute 'buffer'

In [14]:
!python test.py


('chars', 82)
('lines', 3)
('words', 15)

In [102]:
# Hacky way to make it work
temp = !python test.py
for t in temp:
    print(eval(t))


('chars', 116)
('lines', 5)
('words', 22)

Multistep job


In [12]:
%%writefile top_word.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()


Writing top_word.py

In [163]:
temp = [1,2,3,4,5,6,7]
temp[3:3] = [temp[3]]*5
temp


Out[163]:
[1, 2, 3, 4, 4, 4, 4, 4, 4, 5, 6, 7]

In [15]:
!python top_word.py -r local my_file.txt


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/top_word.Jason.20160918.220934.543803
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/top_word.Jason.20160918.220934.543803/output...
2	"cats"
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/top_word.Jason.20160918.220934.543803...

Practice problems

Word count where we try to not use builtin abstractions


In [49]:
import sys

sys.stderr

In [50]:
print("cat", file=sys.stderr)


cat

In [91]:
%%writefile word_count_no_abstractions.py

import re
import sys
from mrjob.job import MRJob
from mrjob.step import MRStep


WORD_RE = re.compile(r"[\w']+")

class WordCount(MRJob):
    
    def steps(self):
        return [
                MRStep(mapper=self.mapper_emit_words,
                       combiner=self.combiner_count_words,
                       reducer=self.reducer_count_words),
                MRStep(mapper=self.mapper_make_cap,
                       reducer_init=self.reducer_init_for_status,
                       reducer=self.reducer_find_max_word)
                ]
    
    def mapper_emit_words(self, _, line):
        self.increment_counter('stats', 'characters', len(line))
        self.increment_counter('stats', 'lines', 1)
        for word in WORD_RE.findall(line.lower()):
            self.increment_counter('stats', 'words', 1)
            yield (word, 1)
            
    def combiner_count_words(self, word, count):
        yield (word, sum(count))
        
    def reducer_count_words(self, word, count):
        yield (word, sum(count))
    
    
    def mapper_make_cap(self, word, count):
        yield None, (word.upper(), count)
    
    def reducer_init_for_status(self):
        self.status = 0
    
    def reducer_find_max_word(self, _, word_count):
        if self.status == 0:
            self.set_status(type(word_count))
            self.status = 1
        yield max(word_count, key=lambda x: x[1])

if __name__ == "__main__":
    WordCount.run()


Overwriting word_count_no_abstractions.py

In [92]:
!python word_count_no_abstractions.py my_file.txt


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/word_count_no_abstractions.Jason.20160919.061619.467904
Running step 1 of 2...
Counters: 3
	stats
		characters=116
		lines=5
		words=22
Counters: 3
	stats
		characters=116
		lines=5
		words=22
Running step 2 of 2...
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/word_count_no_abstractions.Jason.20160919.061619.467904/output...
"CATS"	3
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/word_count_no_abstractions.Jason.20160919.061619.467904...

Log processing


In [108]:
# Make a sample of the file
!head -n 2000 data/anonymous-msweb.data.txt > data/anonymous-msweb.data.sample.txt

In [144]:
# View a few instances of the file
!tail -n 10 data/anonymous-msweb.data.sample.txt


V,1034,1
C,"10398",10398
V,1036,1
V,1040,1
C,"10399",10399
V,1008,1
V,1052,1
V,1018,1
C,"10400",10400
V,1004,1

In [147]:
%%writefile AtLeastKViews.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.compat import jobconf_from_env

class AtLeastKViews(MRJob):
    """
    Given the Vlog dataset, returns all the pages that have over K views
    along with the view count.
    
    usage: 
    $ python AtLeastKViews.py [--jobconf "k=1000"] [-q] <data>
    """
    def steps(self):
        """
        Defines a single step job
        """
        mr_steps = [MRStep(mapper=self.mapper_filter_only_Vs,
                           combiner=self.combiner_sum_sites,
                           reducer_init=self.reducer_init,
                           reducer=self.reducer_sum_and_filter_sites)]
        return mr_steps
    
    def mapper_filter_only_Vs(self, _, lines):
        if lines[0] == "V":
            terms = lines.split(",")
            yield(terms[1], 1)
            
    def combiner_sum_sites(self, site, count):
        # Remember, the mapper returns a generator and the value
        # is a generator as well.
        yield(site, sum(count))
        
    def reducer_init(self):
        # Good example on how to define arguments to use
        self.greater_than = int(jobconf_from_env("k", default=50))
        
    def reducer_sum_and_filter_sites(self, site, count):
        site_count = sum(count)
        if site_count >= self.greater_than:
            yield(site, site_count)
        
if __name__ == "__main__":
    AtLeastKViews.run()


Overwriting AtLeastKViews.py

In [155]:
!python AtLeastKViews.py --jobconf "k=1000" -q data/anonymous-msweb.data.txt


"1001"	4451
"1003"	2968
"1004"	8463
"1008"	10836
"1009"	4628
"1017"	5108
"1018"	5330
"1020"	1087
"1025"	2123
"1026"	3220
"1030"	1115
"1032"	1446
"1034"	9383
"1035"	1791
"1037"	1160
"1038"	1110
"1040"	1506
"1041"	1500